package com.goxplanet;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Calendar;
import java.util.Random;

public class StockTradeSource extends RichParallelSourceFunction<StockTrade> {
    @Override
    public void run(SourceContext<StockTrade> sourceContext) throws Exception {
        Random rand = new Random();
        while (true) {
            long curTime = Calendar.getInstance().getTimeInMillis();
            sourceContext.collect(new StockTrade("HK.0700",curTime, 500 + rand.nextInt(50),10L + rand.nextInt(100) ));
            sourceContext.collect(new StockTrade("HK.9988",curTime, 100 + rand.nextInt(30),10L + rand.nextInt(100) ));
            Thread.sleep(5000);
        }
    }

    @Override
    public void cancel() {

    }
}
